//  Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#ifndef ROCKSDB_WALHANDLER_H
#define ROCKSDB_WALHANDLER_H

#include <atomic>
#include <ctime>
#include <deque>
#include <limits>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "env/mock_env.h"
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
#include "util/cast_util.h"

namespace rocksdb {
// In a memory engine, a lot of data is held in memory, so a WAL (pre-written
// log file) is no longer suitable for holding all the data in memory.In order
// to make Wal compatible with the memory engine, the Wal management module is
// used to manage the pre-write log files.
class WALHandler {
 public:
  WALHandler(const DBOptions& db_options, VersionSet* versions);
  WALHandler() = delete;
  WALHandler(const WALHandler&) = delete;
  WALHandler& operator=(const WALHandler&) = delete;
  ~WALHandler() = default;
  // Creation of a single WAL
  Status Create();
  // Before you can use the WAL management module, you need to load the
  // internal statistifier
  Status LoadWAL(InternalStats* internal_stats = nullptr);
  Status DeleteFile(tm* time);
  Status DeleteFile(SequenceNumber number);
  // WAL write implementation
  Status Write(const WriteOptions& write_options, WriteBatch* my_batch, DB* db, FlushScheduler* flush_scheduler);
  Status PipelineWrite(const WriteOptions& write_options,
                                   WriteBatch* my_batch, DB* db, FlushScheduler* flush_scheduler);
  // The shutdown of the WAL management module
  void Close();
  // Deferred processing after each batch is committed in writeGroup.Wake up the
  // leader of the next writegroup writer thread.
  void PostProcess();

  uint64_t GetLogfileNumber() const;
  Status RecoverLogFilesToTables(ColumnFamilyMemTables* memtables,
                                 SequenceNumber& last_sequence);

  Status TEST_Iterate(int& data_sum, std::vector<std::string>& str);
  const std::string& TEST_GetPath() const { return wal_dir_; }

 private:
  uint64_t getLastLogfileNumber() const;
  uint64_t getTotalLogfileSize() const;

  Status preprocessWrite();
  bool readRecord(Slice* record, std::string* scratch);
  Status deleteWAL(uint64_t number);

  static WriteBatch* mergeBatch(const WriteThread::WriteGroup& write_group,
                                WriteBatch* tmp_batch, size_t* write_with_wal,
                                WriteBatch** to_be_cached_state,
                                SequenceNumber seq);

  Status writeToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
                    uint64_t* log_used, uint64_t* log_size);

  Status iterateRecord(const std::string& fname, uint64_t number,
                       int& data_sum, std::vector<std::string>& str);
  Status rename(const std::string& src, const std::string& target);
  Status acquireWALFiles();
  Status getSortedWALFiles(VectorLogPtr& log_files);
  Status sync();

  int64_t envGetHour();
  int64_t envGetNowTime();
 private:
  Env* env_;

  const DBOptions initial_db_options_;
  const ImmutableDBOptions db_options_;
  MutableDBOptions mutable_db_options_;

  WriteThread write_thread_;
  // The options to access storage files
  const EnvOptions env_options_;

  std::string wal_dir_;
  VersionSet* versions_;
  Statistics* stats_;
  // The sequence number of the log file name.The first half of the log file
  // name is determined by the time of the last WAL modification.The second half
  // of the log file name is determined by the number of files at this point in
  // time
  int64_t logfile_number_;
  int64_t logfile_number_prefix_ = 0;
  int64_t logfile_number_suffix_ = 0;
  // The number of threads in a group in group commit
  std::atomic<uint64_t> wait_commit_batch_num_;
  bool last_commit_batch_completed_;
  std::atomic<uint64_t> total_log_size_;
  // WAL file size limits
  uint64_t max_wal_size_ = 64 << 20;

  std::atomic<uint64_t> last_sequence_;

  std::vector<log::Reader*> readers_;
  uint64_t reader_index_ = 0;
  // In group commit mode, a lock used for multithreading management
  std::mutex mutex_;
  std::condition_variable condvar_;

  struct LogReporter : public log::Reader::Reporter {
    Env* env;
    Logger* info_log;
    const char* fname;

    Status* status;
    bool ignore_error;  // true if db_options_.paranoid_checks==false
    void Corruption(size_t bytes, const Status& s) override {
      ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
                     (this->ignore_error ? "(ignoring error) " : ""), fname,
                     static_cast<int>(bytes), s.ToString().c_str());
      if (this->status->ok()) {
        // only keep the first error
        *this->status = s;
      }
    }
  };

  struct LogFileInfo {
    // pass ownership of _writer
    LogFileInfo(uint64_t _number, uint64_t _size)
        : number(_number), size(_size) {}
    void ResetNumber(uint64_t new_number) { number = new_number; }
    void AddSize(uint64_t new_size) { size += new_size; }

    uint64_t number;
    // Visual Studio doesn't support deque's member to be noncopyable because
    // of a std::unique_ptr as a member.
    uint64_t size;
    // true for some prefix of logs_
    bool getting_synced = false;
  };
  // The tag information used to record the recovery effort
  bool is_recovered_ = true;
  std::deque<LogFileInfo> logs_;
  log::Writer* log_writer_ = nullptr;
  bool log_empty_ = true;
  // The time of the last fixed drop
  int64_t last_create_wal_per_hour_;
  InternalStats* default_cf_internal_stats_ = nullptr;

  struct CompareLogByPointer {
    bool operator()(const std::unique_ptr<LogFile>& a,
                    const std::unique_ptr<LogFile>& b) {
      auto a_impl = static_cast_with_check<LogFileImpl, LogFile>(a.get());
      auto b_impl = static_cast_with_check<LogFileImpl, LogFile>(b.get());
      return *a_impl < *b_impl;
    }
  };
};
}  // namespace rocksdb

#endif  // ROCKSDB_WALHANDLER_H